[Tests passing] [2.0] Add initial eq-to-pos delete job#356
[Tests passing] [2.0] Add initial eq-to-pos delete job#356Zyiqin-Miranda merged 5 commits into2.0from
Conversation
1c30d06 to
3d5149d
Compare
|
First version of converter with test to verify correctness working here.
|
raghumdani
left a comment
There was a problem hiding this comment.
Thanks for putting this full implementation together. Great work so far. Couple of things I think would be useful here:
- Modularize all the invocations to catalog client so that we can independently write unit tests for it, and change it when internal catalog support is available.
- I would not compare the hashes here. Although the probability is low, collisions can theoretically occur and we cannot detect/recover them.
- We can add e2e functional tests. I see only one sanity test though.
- Would it be simpler to have a separate package for this implementation and use deltacat as a dependency in that package as there is only one way dependency? I fear we may create high coupled functions overtime making the maintenance (with DeltaCAT 2.0) of deltacat harder.
- Move all the short term hacks like the overrides into
_privatemodule to emphasize danger in taking any dependency on those functions.
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
| data_file_table["primarykey"], | ||
| equality_delete_table["primarykey"], | ||
| ) | ||
| positional_delete_table = data_file_table.filter(equality_deletes) |
There was a problem hiding this comment.
This looks like a table we get after filtering all the rows matching equality delete values.
There was a problem hiding this comment.
Are you suggesting a naming changes here?
There was a problem hiding this comment.
yes.. The name doesn't reflect what it really is.
| ) | ||
|
|
||
| from deltacat.utils.daft import _get_s3_io_config | ||
| # TODO: Use Daft SHA1 hash instead to minimize probably of data corruption |
There was a problem hiding this comment.
As long as we use sha1, we are at the mercy of probability. Although chances are low, it can happen and cause correctness issues. I don't think we should be using any kind of hashing here to check for equality.
There was a problem hiding this comment.
I think using SHA-1 is still the right choice here to keep memory requirements predictable. Although a probability of collision exists, some probability of data corruption always exists that may be outside of your control (beyond perhaps eventually consistent detection and rectification mechanisms).
In this case, the chance of SHA-1 collision is likely lower than the probability of introducing corrupt results anyways due to, say, writing back results from memory corrupted due to a hardware failure (e.g., due to the non-zero frequency of these types of errors observed while running compaction and similar jobs at scale internally at Amazon over the past year).
There was a problem hiding this comment.
If you want to choose a middle-ground approach, perhaps you should choose a record count at which the probability of collision is appreciably high-enough to necessitate switching (e.g., perhaps in the septillions of records, assuming that we ever manage datasets that get there).
There was a problem hiding this comment.
+1 for continue using SHA1 as hashing approach here.
For your @raghumdani concern of using SHA1 for primary key lookup, we have 50% collision probability of collision across 1.2 * 10 ^21 records using birthday problem, (which is 1.2 sextillion records in ONE bucket, which is not a reasonable expectation of our current data volume). So I don’t think that’ll be a legit concern for us in the near future.
The pros for using Hashing of primary keys are:
- Simplify memory estimation logic
- Avoid OOM
- Efficiency of cluster usage, no need to take into variable length string primary keys.
- Avoid error in resource estimation, since we don’t need to do file sampling job, which any error in file sampling can cause actual job with SLA expectation to fail
There was a problem hiding this comment.
To my mind, correctness is the deal-breaker here however low the probability is. We can figure out how to keep memory requirements stable as a separate problem for which there is already a working implementation in deltacat. As a side note, we have never seen nor we will see writing back corrupted results due to hardware failures as we have checksums in parquet and S3 performs integrity check on multipart uploads by default. However, we have had issues corrupting RCFs due to code bugs we had introduced. In the current compactor, this is already a risk. From the get go, we have it disabled for majority of tables and have recently introduced this env variable SHA1_HASHING_FOR_MEMORY_OPTIMIZATION_DISABLED to disable it for all the tables.
There was a problem hiding this comment.
Can we implement the option to toggle instead of creating a TODO? I believe it's not too much of effort (just don't call hash() method on line 185). This PR already has a lot of tech debt and we want to avoid creating more.
There was a problem hiding this comment.
It's not just about not calling hash() right? It changes how you estimate your memory resources depend on what method you're using, since you'll get variable length string primary key if you get rid of hash right?
There was a problem hiding this comment.
Correct, that part is buggy even in the current state as daft would end up reading entire pk column since you are not using the streaming reader. You already have a TODO for it.
There was a problem hiding this comment.
My understanding of this conversation is that not the entire pk column will be kept in memory:
Miranda Zhu
Nov 15th, 2024 at 9:40 AM
I wonder If there is a way to apply the UDF while downloading the columns?
Specifically, we’d like to only keep the columns with UDF applied in-memory but discard the origin columns, that could be really useful to us too
Jay Chia
The new execution engine should apply these in a pipelined fashion, so yes it would happen automatically if you did something like:
There was a problem hiding this comment.
Nice, you can add a TODO.
0550d6b to
7024222
Compare
7024222 to
b6bda23
Compare
b6bda23 to
79801d1
Compare
deltacat/compute/converter/dev/example_single_merge_key_converter.py
Outdated
Show resolved
Hide resolved
| table_metadata=iceberg_table.metadata, | ||
| files_dict_list=to_be_added_files_dict_list, | ||
| ) | ||
| commit_overwrite_snapshot( |
There was a problem hiding this comment.
I may have missed this in first review. We need to gracefully handle any error from commit conflicts which will be resolved by a new job run.
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
| def catalog(self): | ||
| return self["catalog"] |
| @@ -0,0 +1,90 @@ | |||
| from typing import Optional, Dict | |||
| from deltacat.exceptions import RetryableError | |||
There was a problem hiding this comment.
FYI, I have assumed you've done the due diligence to ensure the estimation is accurate. Not going into greater depth on this business logic.
| import s3fs | ||
|
|
||
|
|
||
| def get_credential(): |
There was a problem hiding this comment.
I have seen this method duplicated at multiple places.
raghumdani
left a comment
There was a problem hiding this comment.
Conditional approval provided all the TODOs are taken as fast follow ups.
|
Converter to-be implemented features list, tracking here for future PR references. P0. Multiple identifier columns, column concatenating + relevant memory estimation change P1. Currently, Assuming 1 node can fit one hash bucket for now, adjust parallel data file to download in convert function. |
cf14f41 to
013e2f4
Compare
|
Merged in PR as all checks have passed. |
|
Tracked in issue. |
* [WIP] Add eq-to-pos delete job session draft * Update with producing file level pos deletes * Resolve dependency conflicts with 2.0 branch * Add more documentation to example; code cleanup * Bump linter version to fix linter + reformatting
* [WIP] Add eq-to-pos delete job session draft * Update with producing file level pos deletes * Resolve dependency conflicts with 2.0 branch * Add more documentation to example; code cleanup * Bump linter version to fix linter + reformatting
For getting overall high-level feedback purpose.